package com.flink.day07_flink_cdc;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @description: TODO FLink CDC SQL
 * @author: HaoWu
 * @create: 2021年05月24日
 */
public class Flink02_Flink_CDC_Sql {
    public static void main(String[] args) throws Exception {
        //1.创建flink sql执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.创建FLink-Mysql-CDC 的Source
        String sourceSql= "create table user_info(\n" +
                "    id int,\n" +
                "    name string,\n" +
                "    age int\n" +
                ")\n" +
                "with(\n" +
                "  'connector'='mysql-cdc',\n" +
                "  'hostname'='hadoop102',\n" +
                "  'port'='3306',\n" +
                "  'username'='root',\n" +
                "  'password'='root',\n" +
                "  'password'='root',\n" +
                "  'database-name'='gmall-flink-200821',\n" +
                "  'table-name'='z_user_info'\n" +
                ")";
        tableEnv.executeSql(sourceSql);
        tableEnv.executeSql("select * from user_info").print();

        //3.执行程序
        env.execute();
    }
}